This document provides a comprehensive analysis of how Apache Spark's Kafka connector decides offset ranges for each partition and determines the total number of rows to process. The analysis is based on the source code from the Kafka 0.10+ SQL connector.
Key Components Architecture
Architecture Component Explanation
The diagram above illustrates the complete architecture flow of Spark's Kafka offset management system:
Driver Components (Blue Section):
KafkaSource/KafkaMicroBatchStream: Acts as the main coordinator that starts queries, manages batch lifecycle, and tracks progress. Think of it as the "conductor" orchestrating the entire process.
KafkaOffsetReader: The "scout" that communicates with Kafka to discover what data is available. It fetches the latest and earliest offsets for each partition and handles timestamp-based queries.
KafkaOffsetRangeCalculator: The "strategist" that decides how to split the work. It takes the available offset ranges and determines how many Spark partitions to create and where to assign them.
Executor Components (Orange Section):
KafkaSourceRDD: The "blueprint" that defines how data will be read. It creates the actual partitions that will run on executors and implements the compute logic.
KafkaDataConsumer: The "worker" that does the actual data fetching. It maintains connection pools, handles retries, and processes individual records.
External System (Green Section):
Kafka Cluster: The data source containing topic partitions, offset metadata, and the actual messages.
The arrows show the flow: Driver components plan the work, Executors execute it, and both interact with Kafka for different purposes (metadata vs data).
Configuration Parameters Impact
Configuration Impact Explanation
This diagram shows how configuration parameters directly affect processing characteristics:
Configuration Parameters (Left Side):
minPartitions: Like setting a minimum number of workers for a job. Even if you have small tasks, you want at least this many people working in parallel.
maxRecordsPerPartition: Like setting a maximum weight limit per worker. No single worker should handle more than this many records to prevent exhaustion (memory issues).
maxOffsetsPerTrigger: Like setting a speed limit for streaming jobs. Controls how fast you consume data to prevent overwhelming downstream systems.
failOnDataLoss: Like choosing between "stop everything if there's a problem" vs "log the problem and continue". Critical for data consistency requirements.
Processing Impact (Right Side):
Each configuration parameter affects different aspects of performance:
Partition Count: More partitions = more parallelism = faster processing (up to a point)
Memory Usage: Larger partitions = more memory per task = potential out-of-memory errors
Throughput: Rate limiting affects how quickly you can process data
Fault Tolerance: Error handling strategy affects system reliability
Offset Range Calculation Algorithm
Algorithm Flow Explanation
This flowchart illustrates the step-by-step process of how Spark calculates offset ranges:
Step 1 - Input Processing:
Imagine you have a pizza delivery business with 3 delivery areas (Kafka partitions). Each area has a different number of orders waiting:
Area 1 (orders-0): 150,000 orders (from order #1000 to #151000)
Area 2 (orders-1): 80,000 orders (from order #2000 to #82000)
Area 3 (orders-2): 30,000 orders (from order #5000 to #35000)
Step 2 - Memory Management Check:
You decide each delivery driver can handle at most 50,000 orders (maxRecordsPerPartition = 50k). This prevents any single driver from being overwhelmed.
Step 3 - Splitting Oversized Areas:
Area 1: 150k orders > 50k limit → Split into 3 drivers (50k + 50k + 50k)
Area 2: 80k orders > 50k limit → Split into 2 drivers (40k + 40k)
Area 3: 30k orders < 50k limit → Keep as 1 driver (30k)
Total: 6 drivers
Step 4 - Parallelism Check:
You want at least 8 drivers working (minPartitions = 8) for efficiency, but you only have 6. So you need 2 more drivers.
Step 5 - Additional Splitting:
Take the largest remaining chunks (the 50k order areas) and split them further:
Split one 50k chunk into two 25k chunks
Split another 50k chunk into two 25k chunks
Now you have 8 drivers total
Step 6 - Driver Assignment:
Assign each driver to a specific delivery truck (executor) using a consistent method (hashing). This ensures the same driver handles the same area consistently, which improves efficiency through route familiarity (consumer reuse).
Detailed Partition Splitting Example
Let's trace through a complete example with visual representation:
Initial Kafka State
Initial State Explanation
Think of this as a warehouse inventory system:
orders topic: Like a large warehouse with 3 sections (partitions)
Section 0: Contains 150,000 items (orders) numbered 1000 to 151000
Section 1: Contains 80,000 items numbered 2000 to 82000
Section 2: Contains 30,000 items numbered 5000 to 35000
payments topic: Like a smaller warehouse with 1 section
Section 0: Contains 40,000 payment records numbered 100 to 40100
Configuration: We want at least 8 workers (minPartitions) and no worker should handle more than 50,000 items (maxRecordsPerPartition).
Step 1: Apply maxRecordsPerPartition
Step 1 Explanation
This is like organizing a large warehouse shipping operation:
Initial Assessment:
orders-0: 150k items - too much for one worker (limit: 50k)
orders-1: 80k items - too much for one worker
orders-2: 30k items - manageable for one worker
payments-0: 40k items - manageable for one worker
Splitting Strategy:
orders-0: Divide 150k items into 3 equal chunks of 50k each
Worker 1 handles items 1000-51000
Worker 2 handles items 51000-101000
Worker 3 handles items 101000-151000
orders-1: Divide 80k items into 2 equal chunks of 40k each
Worker 4 handles items 2000-42000
Worker 5 handles items 42000-82000
Result: We now have 7 workers, but our target is 8 for optimal parallelism.
Step 2: Apply minPartitions
Step 2 Explanation
This is like adding one more worker to achieve optimal team size:
Gap Analysis:
We have 7 workers but need 8 for optimal efficiency. We need to split one more partition.
Selection Strategy:
Among all current partitions, we look for the largest ones that can be split without creating too much imbalance:
Three 50k partitions (orders-0 chunks) are the largest
Two 40k partitions (orders-1 chunks) are medium
One 30k partition (orders-2) is smallest
One 40k partition (payments-0) is medium
Splitting Decision:
We choose to split one of the 50k partitions (orders-0-0) because:
It's the largest, so splitting it creates the most balanced result
Splitting it into 25k + 25k creates two manageable workloads
The resulting distribution is more even
Final Team:
Now we have 8 workers with loads ranging from 25k to 50k items - much more balanced than the original 30k to 150k range.
Final Partition Layout
Final Layout Explanation
This diagram shows the final "work assignment" across the computing cluster:
Executor Assignment (Like Warehouse Locations):
Executor 1: Gets all orders-0 related partitions (partitions 0, 1, 2)
This is like assigning one warehouse to handle all orders from region 0
Benefits: Can reuse connections, cache, and local optimizations
Workload: 25k + 25k + 50k = 100k records total
Executor 2: Gets remaining orders-0 and some orders-1 (partitions 3, 4)
Mixed assignment but still maintains some locality
Load Balance: 2:1 ratio between largest and smallest partition (50k vs 25k) is acceptable
Parallelism: 8 tasks running simultaneously across 3 executors
Memory: Each partition uses at most 50MB (50k records × 1KB average), well within limits
Estimated Processing Time:
Assumes ~10,000 records per minute processing rate
Tasks finish between 2.5-5 minutes, creating acceptable skew
Total job completes in ~5 minutes (limited by the slowest partition)
Row Counting Mechanisms
Estimation vs Actual Counting
Row Counting Explanation
This illustrates the difference between "estimated" and "actual" record counts, like the difference between a restaurant's seating capacity and actual customers served:
Estimation Phase (Planning - Left Side):
Think of this like a restaurant manager planning for the evening:
Offset Range Calculation: "We have table numbers 1000 to 151000, so we can serve 150,000 customers"
Simple Assumption: "Each table number = one customer"
Planning Decisions: Based on this estimate, assign 3 waiters to handle 50,000 customers each
Reality Check (Execution - Right Side):
When the restaurant actually opens:
Actual Service: Some table numbers are reserved but empty (transaction metadata)
Filtering: Some reservations were cancelled (aborted transactions)
Real Count: Only 147,500 customers actually showed up and were served
Why the Difference?
Transaction Control Records: Like reservation system metadata - takes up space but isn't a real customer
Aborted Transactions: Like cancelled reservations - the table number was used but no customer came
Log Compaction: Like updating a reservation - the old entry is removed, new one added
Practical Impact:
Planning: Use estimates to decide resource allocation (number of waiters/partitions)
Execution: Count actual customers served (records processed)
Reporting: Report actual numbers, not estimates
This two-phase approach allows Spark to make good planning decisions quickly while still providing accurate final counts.
Transaction Isolation Impact
Transaction Isolation Explanation
This diagram shows how Kafka's transaction system affects record counting, like filtering valid vs invalid items on a production line:
Raw Kafka Stream (Top Section):
Imagine a manufacturing conveyor belt with different types of items:
Data Records (Green): Actual products ready for shipping
Control Records (Red): Assembly line control signals, not products
Aborted Records (Red): Defective products that failed quality control
Processing Logic (Middle Section):
Like a quality control inspector:
KafkaDataConsumer: The inspector examining each item
Record Type Check: "Is this a product or a control signal?"
Isolation Level Check: "Is this product from a completed batch or a cancelled one?"
Specific Example Walk-through:
offset: 1000: ✅ Valid order data → Count it (totalRecordsRead++)
Scalability: 78% resource utilization leaves room for growth
Consumer Pool Management Strategy
Consumer Pool Management Explanation
This diagram illustrates how Spark manages Kafka consumer connections, like a restaurant managing specialized cooking stations:
Consumer Pool Architecture (Like Restaurant Stations):
Executor 1 (Like Main Kitchen):
Consumer Pool: A storage area for 16 specialized cooking tools
LRU Cache: Keeps the 16 most recently used tools ready
Hit Rate 85%: 85% of the time, the needed tool is already available
Avg Lifetime 30 min: Tools stay active for 30 minutes on average
Specific Assignments:
orders-0 → Consumer1: Dedicated pasta station
Reused across batches: Same chef handles all pasta orders
Connection: Persistent: Keeps the pasta machine running
Last used: 2 min ago: Recently active, ready for next order
orders-1 → Consumer2: Dedicated pizza station
Sticky assignment: Same chef always makes pizzas
TCP connection: Active: Oven stays heated
Active tasks: 3: Currently making 3 pizzas
Executor 2 (Like Dessert Kitchen):
Consumer Pool: Specialized for dessert making
Hit Rate 90%: Very efficient tool usage
Eviction Rate 2/hour: Rarely needs to replace tools
Specific Assignments:
payments-0 → Consumer3: High-volume ice cream station
High reuse frequency: Constantly making ice cream
Throughput: 12k rec/sec: Very fast production
Uptime: 45 min: Running efficiently for 45 minutes
orders-2 → Consumer4: Moderate-volume cake station
Moderate usage: Occasional cake orders
Fetch size: 1MB: Medium-sized batches
Idle time: 5 min: Waiting for next order
Executor 3 (Like Specialty Kitchen):
Consumer Pool: Handles specialty items
Hit Rate 82%: Good but not perfect efficiency
Memory usage: 50MB: Moderate resource consumption
Specific Assignments:
inventory-0 → Consumer5: Low-volume specialty station
Low frequency partition: Rarely used specialty items
Batch size: 1k records: Small orders
Last active: 10 min: Recently used but now idle
Pool Management Benefits:
Assignment Strategy:
Hash-based distribution: Like assigning chefs to stations based on their specialties
Consistent assignment: Same chef always handles the same type of dish
Load balancing: Evenly distribute work across all kitchens
Performance Impact:
Connection overhead: -60%: Less time spent setting up stations
Throughput increase: +40%: More efficient production
Latency reduction: -25%: Faster order fulfillment
Memory Management:
Pool size: 16 per executor: Each kitchen has 16 specialized tools
Memory per consumer: 3MB: Each tool uses 3MB of storage
Total overhead: 144MB: Total memory cost across all kitchens
Real-World Analogy:
Imagine a restaurant where:
Instead of setting up a new cooking station for each order, chefs reuse existing stations
Popular dishes (like orders-0) get dedicated, always-ready stations
Less popular dishes (like inventory-0) share stations that are set up on demand
The result is faster service, less waste, and better resource utilization
This consumer pool strategy is crucial for high-performance Kafka processing because establishing new connections is expensive, but reusing existing connections is very fast.
Performance Optimization Strategies
Partition Sizing Guidelines
Partition Sizing Explanation
This diagram explains how to choose the right partition size, like determining the optimal workload for employees:
Partition Size Factors (Top Center):
Think of this like managing a call center:
Record count: How many calls each agent handles
Record size: How complex each call is (simple inquiry vs. complex problem)
Processing time: How long each call takes
Memory usage: How much information each agent needs to keep in mind
Three Scenarios:
Too Small Partitions (Red - Left):
Like having agents handle only 1-2 calls per hour:
< 10k records: Each agent gets very few calls
High task overhead: More time spent getting ready than working
Poor resource utilization: Agents sitting idle most of the time
Excessive coordinator load: Supervisor spends more time assigning work than agents spend working
Too Large Partitions (Red - Right):
Like having agents handle 100+ calls per hour:
> 500k records: Each agent overwhelmed with calls
Memory pressure: Agents can't keep track of all the information
Long task duration: Some agents take hours to finish while others wait
Straggler tasks: The whole team waits for the slowest agent
Optimal Partitions (Green - Center):
Like having agents handle 20-50 calls per hour:
50k-200k records: Manageable workload per agent
50-200MB memory: Reasonable information to keep track of
1-5 min duration: Predictable completion times
Balanced load: All agents finish around the same time
Sizing Recommendations:
Formula for Optimal Size:
Optimal records per partition = Available Memory / (Record Size × Safety Factor)
Example: 1GB / (1KB × 2) = 500k records maximum
Think of this like: "How many phone calls can an agent handle given their workspace (memory) and the complexity of calls (record size)?"
Configuration Tuning:
maxRecordsPerPartition: 100k: Cap at 100k records per partition
minPartitions: CPU cores × 2: Ensure at least 2 partitions per CPU core
Dynamic adjustment: Monitor and adjust based on actual performance
Performance Impact:
Throughput: More partitions generally mean higher throughput (more agents working)
Latency: Larger partitions mean longer processing time (agents take longer per task)
Sweet spot: Around 100k records balances both concerns
Troubleshooting Guide:
Memory Issues (Red):
When agents run out of workspace:
Reduce maxRecordsPerPartition: Give each agent fewer calls
Increase executor memory: Give each agent a bigger workspace
Enable off-heap storage: Use external storage for complex cases
Performance Issues (Yellow):
When work is too slow:
Increase parallelism: Hire more agents
Check consumer reuse: Ensure agents aren't wasting time on setup
Monitor partition skew: Check if some agents have harder calls than others
Load Balancing (Green):
When some agents are overworked:
Increase minPartitions: Spread work across more agents
Monitor task duration: Check if some tasks consistently take longer
Check preferred locations: Ensure agents are assigned to their best-suited work
This approach ensures optimal resource utilization while maintaining predictable performance.
Monitoring and Troubleshooting
Key Metrics Dashboard
Monitoring and Troubleshooting Explanation
This diagram shows a comprehensive monitoring system, like a hospital's patient monitoring dashboard:
Real-time Metrics (Top Section - Like Vital Signs):
Throughput Metrics (Blue):
Like monitoring a patient's heart rate and blood pressure:
Records/sec: 50k: Processing 50,000 records per second (like heart rate)
Batches/min: 12: Completing 12 batches per minute (like breathing rate)
Lag: 2.5k records: 2,500 records behind latest (like blood pressure)
Latency: P99 < 200ms: 99% of requests complete under 200ms (like response time)
Resource Utilization (Green):
Like monitoring organ function:
CPU: 75%: Processors working at 75% capacity (like brain activity)
Memory: 2.8GB/4GB: Using 70% of available memory (like blood volume)
Network: 100MB/sec: Data flowing at 100MB per second (like blood flow)
Disk I/O: 50MB/sec: Reading/writing at 50MB per second (like kidney function)
Consumer Metrics (Light Green):
Like monitoring specific treatment effectiveness:
Pool hit rate: 85%: Connection reuse working 85% of the time (like medication effectiveness)
Connection reuse: 90%: Avoiding connection overhead 90% of the time (like treatment efficiency)
Fetch latency: 15ms: Taking 15ms to get data (like response time to treatment)
Poll frequency: 100/sec: Checking for new data 100 times per second (like monitoring frequency)
Alert Conditions (Middle Section - Like Medical Alerts):
Performance Alerts (Red):
Like critical vital signs:
Throughput < 30k rec/sec: System processing too slowly (like low heart rate)
Latency > 1000ms: Responses taking too long (like delayed reflexes)
Error rate > 1%: Too many failures (like high fever)
Consumer lag > 10k: Falling too far behind (like irregular heartbeat)
Resource Alerts (Yellow):
Like warning signs:
Memory usage > 90%: Almost out of memory (like dehydration)
GC time > 200ms: Garbage collection taking too long (like slow metabolism)
CPU usage > 90%: Processors overworked (like high stress)
Disk space < 10GB: Running out of storage (like low blood sugar)
Kafka Alerts (Pink):
Like external system failures:
Broker down: Data source unavailable (like IV disconnected)
Partition offline: Part of data unavailable (like blocked artery)
Replication lag > 5 min: Backup systems behind (like backup equipment delayed)
Topic deletion: Data source removed (like medication discontinued)
Troubleshooting Actions (Bottom Section - Like Medical Treatments):
Scale Out (Green):
Like adding more medical staff:
Increase executors: Add more workers (like more nurses)
Add more partitions: Divide work more (like more treatment rooms)
Boost parallelism: Work on more things simultaneously (like parallel treatments)
Load balancing: Distribute work evenly (like even patient distribution)
Configuration Tuning (Purple):
Like adjusting medication dosages:
Adjust partition size: Change workload per worker (like treatment intensity)
Optimize batch size: Change how much work per batch (like medication frequency)
Tune consumer props: Adjust connection settings (like IV flow rate)
Kafka Optimization (Light Blue):
Like improving hospital infrastructure:
Increase retention: Keep data longer (like longer patient records)
Partition rebalancing: Redistribute data (like patient redistribution)
Broker scaling: Add more data servers (like more hospital wings)
Network tuning: Improve data flow (like better communication systems)
Monitoring Philosophy:
Preventive: Watch metrics before problems occur
Reactive: Alert when thresholds are exceeded
Corrective: Take specific actions to fix issues
Continuous: Monitor the effectiveness of corrections
This comprehensive monitoring approach ensures that both the symptoms (performance metrics) and the causes (resource constraints, external dependencies) are tracked and addressed systematically.
Best Practices Summary
Configuration Best Practices
Best Practices Explanation
This diagram outlines proven configurations and practices, like a comprehensive operations manual:
Production Configuration (Top Section - Like Basic Operating Procedures):
Core Settings (Blue):
Like fundamental business rules:
minPartitions: CPU cores × 2: Ensure at least 2 tasks per CPU core for optimal parallelism (like having 2 workers per workstation)
maxRecordsPerPartition: 100k: Cap partition size to prevent memory issues (like limiting case load per employee)
maxOffsetsPerTrigger: 1M: Limit batch size for streaming to prevent overwhelming (like limiting orders per hour)
failOnDataLoss: true: Stop processing if data is lost to ensure accuracy (like stopping production if quality checks fail)
Kafka Consumer Props (Yellow):
Like supplier relationship settings:
fetch.max.bytes: 50MB: Maximum data to fetch in one request (like maximum order size)
max.partition.fetch.bytes: 10MB: Maximum per-partition fetch (like maximum per-supplier order)
Start with proven defaults: Use battle-tested configurations
Monitor and adjust: Continuously optimize based on actual metrics
Plan for growth: Build in capacity for future needs
Prepare for failures: Implement comprehensive error handling and recovery
These best practices represent years of experience running Kafka-based Spark applications in production environments, providing a solid foundation for reliable, high-performance data processing.